Overview

This notebook predicts ratings for new users using the model saved in the previous notebook.

Manufacture some ratings for a new user.


In [1]:
from pyspark.mllib.recommendation import Rating

new_user_ID = 0

new_user_ratings = [
     Rating(0,260,9),   # Star Wars (1977)
     Rating(0,1,8),     # Toy Story (1995)
     Rating(0,16,7),    # Casino (1995)
     Rating(0,25,8),    # Leaving Las Vegas (1995)
     Rating(0,32,9),    # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     Rating(0,335,4),   # Flintstones, The (1994)
     Rating(0,379,3),   # Timecop (1994)
     Rating(0,296,7),   # Pulp Fiction (1994)
     Rating(0,858,10) , # Godfather, The (1972)
     Rating(0,50,8)     # Usual Suspects, The (1995)
    ]

new_user_ratings_RDD = sc.parallelize(new_user_ratings)

In [2]:
new_user_ratings_RDD.collect()


Out[2]:
[Rating(user=0, product=260, rating=9.0),
 Rating(user=0, product=1, rating=8.0),
 Rating(user=0, product=16, rating=7.0),
 Rating(user=0, product=25, rating=8.0),
 Rating(user=0, product=32, rating=9.0),
 Rating(user=0, product=335, rating=4.0),
 Rating(user=0, product=379, rating=3.0),
 Rating(user=0, product=296, rating=7.0),
 Rating(user=0, product=858, rating=10.0),
 Rating(user=0, product=50, rating=8.0)]

Load the original rating dataset


In [3]:
from pyspark.mllib.recommendation import Rating

ratings = sc.textFile('ratings.dat') \
               .map(lambda l: l.split("::")) \
               .map(lambda p: Rating(
                                  user = int(p[0]), 
                                  product = int(p[1]),
                                  rating = float(p[2]), 
                                  ))

Join the new user ratings with the orginal dataset


In [4]:
ratings = ratings.union(new_user_ratings_RDD)

Re-train the model


In [5]:
from pyspark.mllib.recommendation import ALS

rank = 50
numIterations = 20
lambdaParam = 0.1
model = ALS.train(ratings, rank, numIterations, lambdaParam)

Save the model

In production, training the model will happen in a batch process.


In [6]:
# if there is an existing model, delete it
!rm -rf ./recommender_model

# save the model
model.save(sc, './recommender_model')

Load the model

Let's load the model - production code would reload the model every time the model has been updated.


In [7]:
from pyspark.mllib.recommendation import MatrixFactorizationModel

model = MatrixFactorizationModel.load(sc, './recommender_model')

Predict the top 10 movies for the new user

Predict the top 10 movies for the new user based on their other movie ratings


In [8]:
new_user_rated_movie_ids = map(lambda x: x[1], new_user_ratings)

# new_user_rated_movied_ids = [260, 1, 16, 25, 32, 335, 379, 296, 858, 50]

new_user_unrated_movies_RDD = ratings.filter(lambda r: r.product not in new_user_rated_movie_ids) \
                                     .map(lambda x: (new_user_ID, x[0])) \
                                     .distinct()

Let's take a look at the new_user_unrated_movies_RDD data


In [9]:
new_user_unrated_movies_RDD.take(5)


Out[9]:
[(0, 378), (0, 1934), (0, 3282), (0, 5606), (0, 862)]

In [10]:
new_user_recommendations_RDD = model.predictAll(new_user_unrated_movies_RDD)

print(new_user_recommendations_RDD.take(10))


[Rating(user=0, product=1084, rating=7.41803123379617), Rating(user=0, product=3456, rating=7.250149438175512), Rating(user=0, product=3272, rating=5.4176207436700565), Rating(user=0, product=1040, rating=5.485268395607773), Rating(user=0, product=912, rating=8.179587907566159), Rating(user=0, product=140, rating=4.319208170261349), Rating(user=0, product=204, rating=3.435063927802238), Rating(user=0, product=956, rating=6.171745234206328), Rating(user=0, product=3436, rating=3.8179171571136195), Rating(user=0, product=492, rating=6.803288524222469)]

You would want to join the above data set to output the movie names and also filter out movies with less than X number of ratings.
See https://github.com/jadianes/spark-movie-lens/blob/master/notebooks/building-recommender.ipynb for more info.

Note that some of the ratings are bigger than 5.0.
For a possible explation, see http://stackoverflow.com/questions/29051520/apache-spark-als-recommendation

Predict how the user would rate a single movie

Predict how the user would rate this new movie


In [11]:
my_movie = sc.parallelize([(0, 500)]) # Quiz Show (1994)
individual_movie_rating_RDD = model.predictAll(my_movie)
individual_movie_rating_RDD.collect()


Out[11]:
[Rating(user=0, product=500, rating=5.518930176128497)]

In [ ]: